package com.caiya.kafka.support;

import java.util.Map;

import com.caiya.kafka.util.PatternMatchUtils;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.messaging.MessageHeaders;

/**
 * A simple header mapper that maps headers directly; for outbound,
 * only byte[] headers are mapped; for inbound, headers are mapped
 * unchanged, as byte[].
 * Most headers in {@link KafkaHeaders} are not mapped on outbound messages.
 * The exceptions are correlation and reply headers for request/reply
 *
 * @author Gary Russell
 * @since 2.1.3
 */
public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

    /**
     * Construct an instance with the default object mapper and default header patterns
     * for outbound headers; all inbound headers are mapped. The default pattern list is
     * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
     * {@link KafkaHeaders} are never mapped as headers since they represent data in
     * consumer/producer records.
     */
    public SimpleKafkaHeaderMapper() {
        super();
    }

    /**
     * Construct an instance with a default object mapper and the provided header patterns
     * for outbound headers; all inbound headers are mapped. The patterns are applied in
     * order, stopping on the first match (positive or negative). Patterns are negated by
     * preceding them with "!". The patterns will replace the default patterns; you
     * generally should not map the {@code "id" and "timestamp"} headers. Note:
     * most of the headers in {@link KafkaHeaders} are never mapped as headers since they
     * represent data in consumer/producer records.
     *
     * @param patterns the patterns.
     * @see PatternMatchUtils#simpleMatch(String, String)
     */
    public SimpleKafkaHeaderMapper(String... patterns) {
        super(patterns);
    }

    @Override
    public void fromHeaders(MessageHeaders headers, Headers target) {
        headers.forEach((k, v) -> {
            if (v instanceof byte[] && matches(k, v)) {
                target.add(new RecordHeader(k, (byte[]) v));
            }
        });
    }

    @Override
    public void toHeaders(Headers source, Map<String, Object> target) {
        source.forEach(header -> target.put(header.key(), header.value()));
    }

}
